package com.lntea.netty.demo.diff.echo.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * Title: NioEchoClientHandler.java<br>
 * Description:  nio非阻塞客户端<br>
 * Copyright: Copyright (c) 2015<br>
 * Company: 北京云杉世界信息技术有限公司<br>
 *
 * @author lichao
 * @date 2019/11/5 14:50
 */
public class NioEchoClientHandler implements Runnable {
    /** 服务器ip */
    private String serverIp;
    /** 端口 */
    private Integer serverPort;
    /** echo标识 */
    private String echoStr;
    /** 多路复用器 */
    private Selector selector;
    /** 客户端通道 */
    private SocketChannel sc;
    /** 是否停止标志 */
    private volatile boolean stop;

    public NioEchoClientHandler(String serverIp, Integer serverPort, String echoStr) {
        this.serverIp = serverIp;
        this.serverPort = serverPort;
        this.echoStr = echoStr;
        try {
            // 1.打开多路复用器
            selector = Selector.open();
            // 2.打开客户端通道
            sc =  SocketChannel.open();
            // 3.配置通道为非阻塞模式
            sc.configureBlocking(false);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    @Override
    public void run() {
        try {
            doConnect();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }

        while (!stop) {
            try {
                // 每隔1秒轮询一次事件队列
                selector.select(1000);
                // 获取事件集合
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                // 遍历事件
                Iterator<SelectionKey> ite = selectionKeys.iterator();
                while (ite.hasNext()) {
                    SelectionKey key = ite.next();
                    ite.remove();

                    try {
                        handleInput(key);
                    } catch (IOException e ){
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
                System.exit(1);
            }
        }

        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 处理连接事件
     * @param key
     */
    private void handleInput(SelectionKey key) throws IOException {
        if (key.isValid()) {
            // 获取客户端连接通道
            SocketChannel sc = (SocketChannel) key.channel();
            // 连接事件
            if (key.isConnectable()) {
                if (sc.finishConnect()) {
                    sc.register(selector, SelectionKey.OP_READ);
                    doWrite(sc);
                } else {
                    System.exit(1);
                }
            }

            // 数据读取事件
            if (key.isReadable()) {
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBufferSize = sc.read(readBuffer);
                if (readBufferSize > 0) {
                    readBuffer.flip();

                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println(String.format("Echo Client receive echo time: %s", body));
                    this.stop = true;
                } else if (readBufferSize < 0) {
                    // 关闭链路
                    key.cancel();
                    sc.close();
                }
            }
        }
    }

    /**
     * 连接服务端
     */
    private void doConnect() throws IOException {
        // 连接服务端成功，监听数据读取事件
        if (sc.connect(new InetSocketAddress(serverIp, serverPort))) {
            sc.register(selector, SelectionKey.OP_READ);
            doWrite(sc);
        } else {
            // 否则一直监听连接事件
            sc.register(selector, SelectionKey.OP_CONNECT);
        }
    }

    /**
     * 向服务端请求echo
     * @param sc
     */
    private void doWrite(SocketChannel sc) throws IOException {
        byte[] bytes = echoStr.getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
        writeBuffer.put(bytes);
        writeBuffer.flip();
        sc.write(writeBuffer);
    }
}
